package server

import (
	"github.com/IBM/sarama"
	"kratos_kafka/internal/conf"
	"kratos_kafka/internal/service"
)

// 为了在 wire依赖注入 中初始化各个Consumer
func NewSaramaKafkaConsumers(bootConf *conf.Bootstrap, greeterService *service.GreeterService) []*sarama.ConsumerGroup {

	ret := make([]*sarama.ConsumerGroup, 0)

	// topic: real_topic
	// group: real_group
	consumerReal := NewSaramaKafkaRealConsumerServer(bootConf, greeterService)
	ret = append(ret, consumerReal)

	// topic: delay_topic1
	// group: delay_group1
	// 如果消息满足了延迟的条件(配置中的msg1_delay_secs),再将消息发送到 real_topic
	consumerDelay := NewSaramaKafkaDelayConsumerServer(bootConf, greeterService)
	ret = append(ret, consumerDelay)

	// Notice 测试 kafka分区扩容后是否会自动消费新分区的数据
	// topic: article_to3
	// group: article_gr3
	saramaTo3Consumer := NewSaramaKafkaArticleTo3ConsumerServer(bootConf, greeterService)
	ret = append(ret, saramaTo3Consumer)

	// Notice 测试 消费kafka指定分区的数据
	// topic: my-topic
	// group: my-group
	saramaPartitionsConsumer := NewSaramaKafkaArticlePartitionsConsumerServer(bootConf, greeterService)
	ret = append(ret, saramaPartitionsConsumer)

	return ret
}
